fix(world-postgres): self-heal LISTEN/NOTIFY interruptions#2
Open
fix(world-postgres): self-heal LISTEN/NOTIFY interruptions#2
Conversation
The dedicated `pg.Client` used for `LISTEN/NOTIFY` is long-lived and will eventually be dropped by the server (idle TCP timeout, pgbouncer rotation, k8s CNI eviction). Previously a single drop stopped all stream delivery until process restart. Two changes make delivery durable: 1. `listenChannel` now reconnects with bounded exponential backoff (250ms → 30s cap). The initial connect must succeed; subsequent reconnects are best-effort and logged. 2. `streams.get` runs a periodic `SELECT ... WHERE chunk_id > lastChunkId` as a safety net for chunks delivered while the LISTEN socket was reconnecting. The poll dedupes against in-band notifications via the existing `enqueue` ordering check. Configurable via `PostgresWorldConfig.streamPollIntervalMs` (default 5000ms; 0 to disable). Tracks vercel#1855. Tests cover three failure modes via testcontainers: - polling fallback delivers chunks inserted with NOTIFY suppressed - reader still receives chunks after pg_terminate_backend kills LISTEN - listenChannel itself reconnects and delivers post-reconnect notifies
🧪 E2E Test Results❌ Some tests failed Summary
❌ Failed Tests💻 Local Development (2 failed)vite-stable (2 failed):
Details by Category❌ 💻 Local Development
✅ 📦 Local Production
✅ 🐘 Local Postgres
✅ 📋 Other
❌ Some E2E test jobs failed:
Check the workflow run for details. |
📊 Benchmark Resultsworkflow with no steps💻 Local Development
workflow with 1 step💻 Local Development
workflow with 10 sequential steps💻 Local Development
workflow with 25 sequential steps💻 Local Development
workflow with 50 sequential steps💻 Local Development
Promise.all with 10 concurrent steps💻 Local Development
Promise.all with 25 concurrent steps💻 Local Development
Promise.all with 50 concurrent steps💻 Local Development
Promise.race with 10 concurrent steps💻 Local Development
Promise.race with 25 concurrent steps💻 Local Development
Promise.race with 50 concurrent steps💻 Local Development
workflow with 10 sequential data payload steps (10KB)💻 Local Development
workflow with 25 sequential data payload steps (10KB)💻 Local Development
workflow with 50 sequential data payload steps (10KB)💻 Local Development
workflow with 10 concurrent data payload steps (10KB)💻 Local Development
workflow with 25 concurrent data payload steps (10KB)💻 Local Development
workflow with 50 concurrent data payload steps (10KB)💻 Local Development
Stream Benchmarks (includes TTFB metrics)workflow with stream💻 Local Development
stream pipeline with 5 transform steps (1MB)💻 Local Development
10 parallel streams (1MB each)💻 Local Development
fan-out fan-in 10 streams (1MB each)💻 Local Development
SummaryFastest Framework by WorldWinner determined by most benchmark wins
Fastest World by FrameworkWinner determined by most benchmark wins
Column Definitions
Worlds:
❌ Some benchmark jobs failed:
Check the workflow run for details. |
`enqueue` previously decremented `offset` and returned without updating `lastChunkId`. The new polling fallback re-queries `chunk_id > lastChunkId` every tick, so chunks intentionally skipped for `startIndex` would come back on the next poll and be skipped again — double-decrementing `offset` and eventually mis-delivering them once `offset` hit zero. Move the high-water mark update to the top of `enqueue`, before the skip branch. Adds a regression test that pre-seeds two chunks, opens the reader with `startIndex=2`, lets several poll ticks fire (none should deliver), then writes a third chunk and asserts only the third reaches the reader.
Two reliability issues surfaced on review: 1. After natural EOF, `streams.get` set `closed = true` and closed the controller but never cleared the polling `setInterval` or removed the EventEmitter listener. The timer kept ticking (no-op via the `closed` guard) and the listener stayed attached for the lifetime of the process. Extracted an idempotent `stop()` that clears both, called from `cancel()` and from the EOF branch in `enqueue`. As a side benefit, the polling timer is no longer started at all if the initial chunk batch already delivered EOF. 2. `listenChannel.close()` called during an in-flight `connect()` could race: `closed = true` was set while `await next.connect()` / `LISTEN` was still resolving, after which the just-connected client would attach its notification listener and persist past close. Added a `closed` re-check after the awaits — if close raced ahead, end the client immediately and bail. Test: a regression test spies on `setInterval`/`clearInterval` and asserts that every interval the streamer scheduled at the configured poll cadence is cleared by the time the consumer reads `done: true`, without the consumer needing to call `cancel()`.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Make stream delivery durable across
LISTEN/NOTIFYinterruptions in@workflow/world-postgres. Tracks vercel#1855.Why
The dedicated
pg.Clientused forLISTEN/NOTIFYis long-lived and eventually gets dropped by the server (idle TCP timeout, pgbouncer rotation, k8s CNI eviction — see brianc/node-postgres#967). Today, a single drop stops all stream delivery until process restart.Changes
listenChannelreconnects with bounded exponential backoff (250ms → 30s cap). Initial connect must succeed (callers expect a live subscription); subsequent reconnects are best-effort and logged.streams.getruns a periodicSELECT ... WHERE chunk_id > lastChunkIdas the always-on safety net. NOTIFY remains the fast path; the poll catches anything dropped during reconnect, deduped by the existingenqueueordering check.PostgresWorldConfig.streamPollIntervalMs(default 5000ms; set to 0 to disable polling — only safe in tests where LISTEN cannot be interrupted).Tests
packages/world-postgres/test/streamer.test.ts— three integration tests against testcontainers Postgres:pg_terminate_backenddrops the LISTEN client, thenstreams.writepublishes a chunk. NOTIFY fires into the void; polling delivers it.Test plan
pnpm exec tsc --noEmit— cleanpnpm exec biome check— cleanpnpm exec vitest run test/streamer.test.ts— 3/3 passing in ~6.5sstorage.test.tsandspec.test.tsunaffected (no shared state)mainof this fork, ready to send upstream as a PR tovercel/workflow🤖 Generated with Claude Code